Skip to content

IBAA05/Security_Agent

Repository files navigation

ARGUS - Security Agent -

**ARGUS - Security Agent - ** is an advanced AI-native security specialist agent designed to perform multi-tool semantic vulnerability analysis. It goes beyond simple regex matches by using DeepSeek LLM reasoning to validate findings, eliminate false positives, and identify complex attack chains.


📑 Table of Contents

  1. Overview
  2. Architecture
  3. Features
  4. Project Structure
  5. Installation
  6. Scanning Modes (PR vs. Nightly)
  7. Monitor Communication
  8. Data Formats
  9. AI & Memory Storage
  10. Human-in-the-Loop & Expert Feedback

🌎 Overview

The Security Agent acts as a specialist in an Agent-to-Agent (A2A) ecosystem. It receives requests from an Orchestrator containing new code changes, relevant symbols, and the context of the blast radius.

Instead of just reporting raw tool output, the agent coordinates multiple security engines (Semgrep, Gitleaks, Trivy, ZAP), scores their findings against a historical memory, and then uses AI Reflexion loops to provide high-quality, actionable remediation advice.


🏗️ Architecture

┌─────────────────────────────────────────────────────────┐
│                    Orchestrator / CI                     │
│              (sends A2A/1.0 JSON task)                   │
└──────────────────────┬──────────────────────────────────┘
                       │  A2A Protocol
                       ▼
┌─────────────────────────────────────────────────────────┐
│                  Security Agent (ARGUS)                  │
│                                                         │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌───────┐  │
│  │ Semgrep  │  │ Gitleaks │  │  Trivy   │  │  ZAP  │  │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘  └───┬───┘  │
│       └─────────────┴──────────────┴─────────────┘      │
│                      Normalizer                         │
│                          │                              │
│              ┌───────────▼───────────┐                  │
│              │    LLM (DeepSeek)     │                  │
│              │   + Reflexion Loop    │                  │
│              └───────────┬───────────┘                  │
│                          │                              │
│              ┌───────────▼───────────┐                  │
│              │     Memory Store      │                  │
│              │   (history.json)      │                  │
│              └───────────┬───────────┘                  │
└──────────────────────────┼──────────────────────────────┘
                           │ Final JSON Report
              ┌────────────▼────────────┐
              │  Monitoring Backend     │◄── HTTP lifecycle events
              │  + Langfuse (LLM trace) │◄── Trace / Span / Generation
              └─────────────────────────┘

✨ Features

Feature Description
🔍 Semantic Analysis Uses DeepSeek LLM to read the code surrounding a finding and verify its validity
📜 Policy Enforcement Automatically checks findings against custom company security policies
🧠 Historical Memory Remembers recurring findings and human verdicts to reduce noise in future scans
🔄 A2A Protocol Fully integrated with the A2A/1.0 SDK for autonomous task execution
🛡️ Multi-Tool Orchestration Unified management of SAST, SCA, Secrets, and DAST tools
📡 Monitor Communication Reports lifecycle events to a backend and full LLM traces to Langfuse
🔁 Reflexion Loop High-priority scans trigger a second LLM pass to self-critique and improve accuracy

📁 Project Structure

Security_Agent/
│
├── main.py                  # Entry point — A2A server & task handler
├── agent_core.py            # Core SecurityReviewerAgent orchestration logic
├── executor.py              # A2A executor / task dispatch
├── schemas.py               # Pydantic models for all data structures
├── normalizer.py            # Unifies raw tool outputs into a standard format
├── scorer.py                # Confidence scoring engine
├── config.py                # Environment & config loader
├── expert_patterns.txt      # Expert-injected security rules for LLM prompt
├── requirements.txt         # Python dependencies
│
├── llm/
│   ├── agent.py             # DeepSeek LLM client + Reflexion loop
│   └── prompts.py           # System & user prompt builders
│
├── tools/
│   ├── runner.py            # Orchestrates all tool runs in parallel
│   ├── semgrep.py           # Semgrep SAST wrapper
│   ├── gitleaks.py          # Gitleaks secrets scanner wrapper
│   ├── trivy.py             # Trivy SCA (CVE) wrapper
│   └── zap.py               # OWASP ZAP DAST wrapper
│
├── memory/
│   ├── store.py             # Read/write historical verdicts
│   └── history.json         # Persistent memory of past findings & verdicts
│
├── monitoring/
│   ├── monitoring_client.py # HTTP client → Monitoring Backend lifecycle events
│   └── langfuse_client.py   # Langfuse SDK wrapper → LLM trace/span/generation
│
├── report/
│   └── generator.py         # Builds the flat JSON report returned to the Orchestrator
│
└── test_samples/            # Sample inputs for local testing

⚙️ Installation

Prerequisites

Before installing the agent, ensure your system meets the requirements:

  • OS: Ubuntu 22.04+ (Preferred) or macOS
  • Python: 3.11+
  • External tools (must be in PATH):
Tool Purpose Install
Semgrep SAST (static analysis) pip install semgrep
Gitleaks Secrets scanning GitHub Releases
Trivy SCA / CVE scanning Installation Guide
OWASP ZAP DAST (staging only) zaproxy.org

Step-by-Step Setup

# 1. Clone the repository
git clone https://github.com/IBAA05/Security_Agent.git
cd Security_Agent

# 2. Create and activate a virtual environment (Python 3.11+)
python3.11 -m venv venv
source venv/bin/activate

# 3. Install Python dependencies
pip install -r requirements.txt

# 4. Configure environment variables
cp .env.example .env
# Edit .env and fill in the required values:
#   DEEPSEEK_API_KEY       — your DeepSeek API key
#   MONITORING_BACKEND_URL — URL of the monitoring backend (default: http://localhost:8000)
#   LANGFUSE_PUBLIC_KEY    — Langfuse public key
#   LANGFUSE_SECRET_KEY    — Langfuse secret key
#   LANGFUSE_HOST          — Langfuse host (default: https://cloud.langfuse.com)
#   ZAP_HOST               — ZAP proxy host (staging only)

🚀 Scanning Modes (PR vs. Nightly)

The agent adjusts its intensity based on the environment flag:

Mode Environment Flag Tools Used Speed Purpose
PR Scan pr Semgrep, Gitleaks, Trivy ⚡ Fast Block vulnerable code before merge
Nightly / Staging staging Above + OWASP ZAP 🐢 Slow Detect live runtime flaws (DAST) on deployment

📡 Monitor Communication

The Security Agent maintains two parallel monitoring channels that report its status and behaviour in real time. Both channels are non-blocking — a failure in monitoring will never crash or delay the security pipeline.


1. Backend Monitoring (HTTP)

File: monitoring/monitoring_client.py

The agent sends lifecycle events to the Monitoring Backend via HTTP POST requests. The backend URL defaults to http://localhost:8000 and can be overridden with the MONITORING_BACKEND_URL environment variable.

Lifecycle Event Flow

Agent receives task
      │
      ▼
POST /agents/received    ← task has been received
      │
      ▼
POST /agents/start       ← analysis pipeline is starting
      │
      ├── (on success) ──►  POST /agents/complete   ← results ready
      │
      └── (on error)   ──►  POST /agents/error      ← unrecoverable failure

Endpoint Reference

Endpoint Function Payload
POST /agents/received report_received(pr_id) { pr_id, agent_type }
POST /agents/start report_start(pr_id) { pr_id, agent_type }
POST /agents/complete report_complete(pr_id, ...) { pr_id, agent_type, final_score, confidence_score, findings_count, risk_score, report }
POST /agents/error report_error(pr_id, error_message) { pr_id, agent_type, error_message }

Note: agent_type is always "security_agent" for this agent.

Example Usage

from monitoring import report_received, report_start, report_complete, report_error

report_received(pr_id=42)
report_start(pr_id=42)

# ... run the full scan pipeline ...

report_complete(
    pr_id=42,
    final_score=0.87,
    confidence_score=0.91,
    findings_count=5,
    risk_score=0.72,
    report={"overall_risk": "HIGH", ...},
)

2. LLM Observability (Langfuse)

File: monitoring/langfuse_client.py

Every LLM call, pipeline stage, and overall scan trace is recorded in Langfuse for full observability — including latency, token usage, and cost tracking.

Observability Hierarchy

Trace  (one per scan / review() call)
  │
  ├── Span: memory_context       ← fetch historical verdicts
  ├── Span: tool_runner          ← run Semgrep / Gitleaks / Trivy / ZAP
  ├── Span: scorer               ← apply confidence scoring
  ├── Span: llm_analysis         ← main DeepSeek LLM call
  │     └── Generation           ← token counts, model, prompt/response
  ├── Span: reflexion            ← (optional) second LLM self-critique pass
  ├── Span: report_generation    ← build final JSON report
  └── Span: memory_update        ← persist new verdicts to history.json

Key Methods

Langfuse v4 note: start_trace() now returns a trace_id string (not an object).
All subsequent calls (start_span, log_generation, end_trace) receive this trace_id.
The same trace_id is sent to the monitoring backend so both systems can be linked.

Method Purpose
langfuse_client.start_trace(...) Open a root trace — returns a trace_id string
langfuse_client.end_trace(trace_id, output) Close the trace with final risk metrics
langfuse_client.start_span(trace_id, name, input_data) Open a pipeline stage span
langfuse_client.end_span(span, output) Close a pipeline stage with its output
langfuse_client.log_generation(trace_id, model, ...) Record an individual LLM call
langfuse_client.flush() Force-flush all buffered events (use in tests)

Example Usage

from monitoring import langfuse_client

# Start trace — returns a trace_id string in v4
trace_id = langfuse_client.start_trace(
    scan_id="scan-42",
    correlation_id="job_ref_001",
    intent="REVIEW_AUTH_MODULE",
    priority="HIGH",
    environment="pr",
    files_count=3,
    needs_reflexion=True,
)

# Wrap each pipeline stage in a span
span = langfuse_client.start_span(trace_id, name="tool_runner", input_data={"files": 3})
# ... run tools ...
langfuse_client.end_span(span, output={"findings": 7})

# Record LLM call
langfuse_client.log_generation(
    trace_id,
    name="main_analysis",
    model="deepseek-chat",
    input_messages=[{"role": "system", "content": "..."}, ...],
    output_text="...",
    usage={"input": 1200, "output": 450, "total": 1650},
)

# Close the trace
langfuse_client.end_trace(trace_id, output={
    "overall_risk":       "HIGH",
    "confirmed_findings": 4,
    "false_positives":    1,
    "critical_count":     1,
    "high_count":         3,
    "confidence":         0.91,
})

langfuse_client.flush()

📊 Data Formats

A2A Protocol Input (JSON)

The Security Agent follows the A2A/1.0 formal communication protocol. The Orchestrator sends a rich context message including the knowledge graph and active problem set:

{
  "protocol": "A2A/1.0",
   "pr_id": 42,
  "metadata": {
    "message_id": "msg_987654321",
    "correlation_id": "job_ref_argus_001",
    "timestamp": "2026-02-24T14:30:00Z",
    "sender": "orchestrator-service",
    "version": "1.2.0"
  },
  "routing_instructions": {
    "priority": "HIGH",
    "target_specialist": "Security_Reviewer",
    "ttl_seconds": 3600
  },
  "payload": {
    "intent": "REFACTOR_AUTH_PIPELINE",
    "knowledge_graph": {
      "nodes": [
        {
          "id": "A",
          "file": "auth_service.py",
          "role": "PRIMARY_SOURCE",
          "symbols_changed": ["verify_jwt"],
          "logic_delta": "Changed algorithm from HS256 to RS256."
        },
        {
          "id": "B",
          "file": "config_loader.py",
          "role": "DEPENDENCY",
          "impact": "Now requires public_key path in environment variables.",
          "status": "AFFECTED_BUT_NOT_MODIFIED"
        }
      ]
    },
    "dehydrated_content": {
      "high_signal_code": [
        {
          "file": "auth_service.py",
          "snippet": "def verify_jwt(token):\n    # logic changes here..."
        }
      ],
      "policy_constraints": [
        "Security Standard v4: All RSA keys must be 4096-bit."
      ]
    },
    "active_problem_set": [
      {
        "type": "LOGIC_INCONSISTENCY",
        "location": "config_loader.py:45",
        "problem": "Loader expects HMAC secret; new logic expects RSA Public Key.",
        "remediation_hint": "Update ConfigLoader to support .pem file loading."
      }
    ]
  }
}

Tool Result Format (Normalized Bundle)

Before sending data to the LLM, the agent aggregates and normalizes findings from all active tools into a single bundle. Every tool (SAST, Secrets, SCA, DAST) outputs this schema:

[
  {
    "tool": "gitleaks",
    "rule_id": "generic-api-key",
    "title": "Hardcoded secret: Generic API Key",
    "severity": "HIGH",
    "file_path": "auth_service.py",
    "line_start": 12,
    "evidence": "Match at line 12 — secret redacted",
    "owasp_category": "A07",
    "cwe": "CWE-798"
  },
  {
    "tool": "semgrep",
    "rule_id": "python.jwt.security.audit.jwt-decode-without-verify",
    "title": "JWT Decode without Verification",
    "severity": "HIGH",
    "file_path": "auth_service.py",
    "line_start": 35,
    "evidence": "jwt.decode(token, options={'verify_signature': False})",
    "owasp_category": "A07",
    "cwe": "CWE-287"
  },
  {
    "tool": "trivy",
    "rule_id": "CVE-2023-36478",
    "title": "Jetty: HTTP/2 DoS vulnerability",
    "severity": "MEDIUM",
    "file_path": "requirements.txt",
    "evidence": "jetty-server 9.4.51.v20230217 → fix: 9.4.52.v20230823",
    "owasp_category": "A06",
    "cwe": "CWE-400"
  },
  {
    "tool": "zap",
    "rule_id": "zap-40012",
    "title": "Reflected Cross-Site Scripting (XSS)",
    "severity": "HIGH",
    "file_path": null,
    "evidence": "<script>alert(1)</script>",
    "owasp_category": "A03",
    "cwe": "CWE-79",
    "description": "ZAP found an XSS vulnerability on the live staging endpoint."
  }
]

Final JSON Report

The final output returned to the Orchestrator (flat format):

{
  "pr_id": 42,
  "scan_id": "msg_987654321",
  "summary": "Critical security issues found in auth_service.py: hardcoded secrets, SQL injection, MD5 password hashing.",
  "overall_risk": "CRITICAL",
  "findings": [
    {
      "rule_id": "generic-api-key",
      "severity": "CRITICAL",
      "base_confidence": 1.0,
      "final_confidence": 1.0,
      "is_false_positive": false,
      "reasoning": "Hardcoded SECRET_KEY at line 7 is a real secret, not a placeholder.",
      "remediation": "Use a secrets manager to retrieve the secret at runtime."
    },
    {
      "rule_id": "python.sqlalchemy.security.sqlalchemy-execute-raw-query",
      "severity": "CRITICAL",
      "base_confidence": 0.612,
      "final_confidence": 0.912,
      "is_false_positive": false,
      "reasoning": "SQL injection — user input concatenated into query.",
      "remediation": "Use parameterized queries."
    }
  ],
  "confirmed_active_problems": [
    "HARDCODED_SECRET",
    "WEAK_CRYPTOGRAPHY"
  ]
}

🧠 AI & Memory Storage

The agent maintains a memory/history.json file that persists across scans:

  1. File Context: Tracks which files attract the most vulnerabilities over time.
  2. Human Verdicts: If a user marks a finding as a false positive, the agent learns and suppresses that finding in future scans.
  3. Confidence Boosting: Confirmed findings raise the base_confidence of similar future findings by +25%.
  4. False Positive Penalty: Dismissed findings apply a permanent -40% penalty to that specific rule/file combination.
  5. Reflexion: High-priority scans trigger a second LLM pass where the agent critiques its own draft for improved accuracy.

🤝 Human-in-the-Loop (HITL) & Expert Feedback

The Security Agent is designed to learn from security experts through two feedback loops:

1. Expert Pattern Injection

Experts can add high-signal rules and custom security requirements to expert_patterns.txt:

  • This file is dynamically injected into the LLM's System Prompt at runtime.
  • The LLM uses these expert instructions to prioritize specific vulnerabilities (e.g., "Always flag RSA keys smaller than 4096-bit").

2. Historical Verdict Influence

When a security expert reviews a scan result:

Verdict Effect
Confirmed Boosts base_confidence of similar future findings in that file by +25%
False Positive Applies a permanent -40% penalty for that specific rule/file combination

The LLM is explicitly informed of historical verdicts in the User Message, enabling it to reason: "I am marking this as a False Positive because a human expert previously dismissed this specific rule in this file."


Version: 1.1.0  |  Agent Type: security_agent  |  Protocol: A2A/1.0

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages